{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Millionaire Odds vs. Hit by a Bus: An ESQL Analysis"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Python modules"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip cache purge\n",
    "!pip uninstall -y elasticsearch\n",
    "!pip install elasticsearch==8.14 pandas matplotlib spicy"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Loading Synthetic data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "from elasticsearch import Elasticsearch, helpers\n",
    "\n",
    "# Initialize Elasticsearch client\n",
    "client = Elasticsearch(\n",
    "    hosts=\"YOUR_DEPLOYMENT_CLOUD_ID\",\n",
    "    api_key=\"YOUR_DEPLOYMENT_API_KEY\",\n",
    ")\n",
    "\n",
    "\n",
    "# Generate synthetic data with a highly skewed distribution\n",
    "num_records = 500000\n",
    "np.random.seed(42)  # Ensure reproducibility\n",
    "\n",
    "# Generate net worth using a highly skewed distribution\n",
    "ages = np.random.randint(20, 80, num_records)  # Random ages between 20 and 80\n",
    "incomes = np.random.exponential(\n",
    "    scale=10000, size=num_records\n",
    ")  # Exponential distribution for income\n",
    "# Use a more skewed distribution for net worth with a much larger range\n",
    "net_worths = np.random.exponential(\n",
    "    scale=100000000, size=num_records\n",
    ")  # Extremely skewed net worth\n",
    "\n",
    "# Scale up the net worths to reach up to $100 billion\n",
    "net_worths = np.clip(net_worths, 0, 100000000000)\n",
    "\n",
    "# Create DataFrame\n",
    "df = pd.DataFrame(\n",
    "    {\n",
    "        \"id\": range(1, num_records + 1),\n",
    "        \"age\": ages,\n",
    "        \"income\": incomes,\n",
    "        \"net_worth\": net_worths,\n",
    "        \"counter\": range(1, num_records + 1),  # Add a counter field for pagination\n",
    "    }\n",
    ")\n",
    "\n",
    "# Index the data into Elasticsearch\n",
    "index_name = \"raw_wealth_data_large\"\n",
    "client.indices.delete(index=index_name, ignore=[400, 404])\n",
    "client.indices.create(index=index_name, ignore=400)\n",
    "\n",
    "\n",
    "def generator(df):\n",
    "    for index, row in df.iterrows():\n",
    "        yield {\"_index\": index_name, \"_source\": row.to_dict()}\n",
    "\n",
    "\n",
    "helpers.bulk(client, generator(df))\n",
    "\n",
    "print(\"Data indexed successfully.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pulling data with ES|QL and creating a dataframe"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "from elasticsearch import Elasticsearch, helpers\n",
    "from io import StringIO\n",
    "import matplotlib.pyplot as plt\n",
    "from scipy.stats import pareto\n",
    "\n",
    "\n",
    "# Function to execute ESQL query and fetch data in chunks\n",
    "def execute_esql_query(query):\n",
    "    response = client.esql.query(query=query, format=\"csv\")\n",
    "    return pd.read_csv(StringIO(response.body))\n",
    "\n",
    "\n",
    "# Function to fetch paginated data using the counter field\n",
    "def fetch_paginated_data(index, num_records, size=10000):\n",
    "    all_data = pd.DataFrame()\n",
    "    for start in range(1, num_records + 1, size):\n",
    "        end = start + size - 1\n",
    "        query = f\"\"\"\n",
    "        FROM {index}\n",
    "        | WHERE counter >= {start} AND counter <= {end}\n",
    "        | limit {size}\n",
    "        \"\"\"\n",
    "        data_chunk = execute_esql_query(query)\n",
    "        all_data = pd.concat([all_data, data_chunk], ignore_index=True)\n",
    "    return all_data\n",
    "\n",
    "\n",
    "# Fetch all data using pagination and ESQL\n",
    "num_records = 500000\n",
    "all_data_df = fetch_paginated_data(index_name, num_records)\n",
    "print(f\"Total Data Retrieved: {len(all_data_df)} records\")\n",
    "\n",
    "# Fit a Pareto distribution to the data\n",
    "shape, loc, scale = pareto.fit(all_data_df[\"net_worth\"], floc=0)\n",
    "\n",
    "# Calculate the probability density for each net worth\n",
    "all_data_df[\"net_worth_probability\"] = pareto.pdf(\n",
    "    all_data_df[\"net_worth\"], shape, loc=loc, scale=scale\n",
    ")\n",
    "\n",
    "# Normalize the probabilities to sum to 1\n",
    "all_data_df[\"net_worth_probability\"] /= all_data_df[\"net_worth_probability\"].sum()\n",
    "\n",
    "print(\"Data with Net Worth Probability:\")\n",
    "print(all_data_df.head())\n",
    "\n",
    "# Find the Net Worth Corresponding to the Bus Hit Probability\n",
    "target_probability = 0.0000181\n",
    "cumulative_probability = all_data_df[\"net_worth_probability\"].cumsum()\n",
    "target_net_worth_df = all_data_df[cumulative_probability >= target_probability].head(1)\n",
    "target_net_worth = target_net_worth_df[\"net_worth\"].iloc[0]\n",
    "print(f\"Net Worth with Probability >= {target_probability}: {target_net_worth}\")\n",
    "\n",
    "# Plot the Net Worth Probability Distribution\n",
    "plt.figure(figsize=(10, 6))\n",
    "plt.hist(\n",
    "    all_data_df[\"net_worth\"],\n",
    "    bins=100,\n",
    "    density=True,\n",
    "    alpha=0.6,\n",
    "    color=\"g\",\n",
    "    label=\"Empirical Data\",\n",
    ")\n",
    "xmin, xmax = plt.xlim()\n",
    "x = np.linspace(xmin, xmax, 100)\n",
    "p = pareto.pdf(x, shape, loc=loc, scale=scale)\n",
    "plt.plot(x, p, \"k\", linewidth=2, label=\"Fitted Pareto Distribution\")\n",
    "plt.axhline(\n",
    "    y=target_probability, color=\"r\", linestyle=\"--\", label=\"Bus Hit Probability\"\n",
    ")\n",
    "plt.axvline(\n",
    "    x=target_net_worth,\n",
    "    color=\"g\",\n",
    "    linestyle=\"--\",\n",
    "    label=f\"Net Worth = {target_net_worth:.2f}\",\n",
    ")\n",
    "plt.xlabel(\"Net Worth\")\n",
    "plt.ylabel(\"Probability\")\n",
    "plt.title(\"Net Worth Probability Distribution\")\n",
    "plt.legend()\n",
    "plt.grid(True)\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Fit Pareto Distribution"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import matplotlib.pyplot as plt\n",
    "from scipy.stats import pareto\n",
    "\n",
    "# Assuming all_data_df contains the fetched net worth data from Elasticsearch\n",
    "# Fit a Pareto distribution to the data\n",
    "shape, loc, scale = pareto.fit(all_data_df[\"net_worth\"], floc=0)\n",
    "\n",
    "# Plot the Net Worth Probability Distribution\n",
    "plt.figure(figsize=(10, 6))\n",
    "\n",
    "# Plot histogram of empirical net worth data\n",
    "plt.hist(\n",
    "    all_data_df[\"net_worth\"],\n",
    "    bins=100,\n",
    "    density=True,\n",
    "    alpha=0.6,\n",
    "    color=\"g\",\n",
    "    label=\"Empirical Data\",\n",
    ")\n",
    "\n",
    "# Plot fitted Pareto distribution\n",
    "xmin, xmax = plt.xlim()\n",
    "x = np.linspace(xmin, xmax, 100)\n",
    "p = pareto.pdf(x, shape, loc=loc, scale=scale)\n",
    "plt.plot(x, p, \"k\", linewidth=2, label=\"Fitted Pareto Distribution\")\n",
    "\n",
    "# Show the plot\n",
    "plt.xlabel(\"Net Worth\")\n",
    "plt.ylabel(\"Probability\")\n",
    "plt.title(\"Net Worth Probability Distribution\")\n",
    "plt.legend()\n",
    "plt.grid(True)\n",
    "plt.show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
